package com.bieber.common.compute;

import com.bieber.common.*;
import com.bieber.common.model.BizOrder;
import com.bieber.common.model.ConsumerPerson;
import com.bieber.common.node.IgniteServer;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cache.query.SqlQuery;
import org.apache.ignite.compute.ComputeJob;

import javax.cache.Cache;
import javax.cache.CacheException;
import java.util.List;
import java.util.concurrent.Callable;

/**
 * Created by bieber on 2015/9/5.
 */
public class ComputePersonOrderJob implements ComputeJob, Callable<Object> {

    private int offset;

    private int pageSize;

    private volatile boolean isCancel;
    
    private String orderCacheName;

    public ComputePersonOrderJob(int offset, int pageSize,String orderCacheName) {
        this.offset = offset;
        this.pageSize = pageSize;
        this.orderCacheName = orderCacheName;
    }

    @Override
    public void cancel() {
        isCancel=true;
    }

    @Override
    public Object execute() throws IgniteException {
        if(isCancel){
            return null;
        }
        IgniteCache<Integer,BizOrder> orderCache =  IgniteServer.getCurrentIgnite().cache(orderCacheName);
        IgniteCache<Integer,PersonOrder> personOrderIgniteCache = IgniteServer.getCurrentIgnite().getOrCreateCache(Constants.PERSON_ORDER_CACHE);
        String personQuery = "1=1 order by id desc limit ?,?";
        StringBuffer sqlBuffer = new StringBuffer("select");
        sqlBuffer.append(" o.personId,sum(o.amount),count(*),p.name from \"");
        sqlBuffer.append(orderCacheName).append("\".").append(BizOrder.class.getSimpleName()).append(" o,");
        sqlBuffer.append(" (select id,name from \"").append(Constants.PERSON_CACHE).append("\".").append(ConsumerPerson.class.getSimpleName()).append(" limit ?,?").append(") p ");
        sqlBuffer.append("where p.id=o.personId group by o.personId");
        SqlFieldsQuery sql = new SqlFieldsQuery(sqlBuffer.toString());
        sql.setArgs(offset,pageSize);
        QueryCursor<List<?>> cursor = orderCache.query(sql);
        for(List<?> row :cursor){
            PersonOrder personOrder = new PersonOrder();
            personOrder.setId((Integer) row.get(0));
            personOrder.setAmount((Double) row.get(1));
            personOrder.setName((String) row.get(3));
            personOrder.setCount((Long) row.get(2));
            personOrderIgniteCache.putIfAbsent(personOrder.getId(),personOrder);
        }
        return pageSize;
    }
    @Override
    public Object call() throws Exception {
        return execute();
    }
}
